package com.liang.xiao.mrexample1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.HashMap;

/**
 * @describe: mapReduce练习例子 根据手机号码统计上传和下载的流量以及总和
 * @author: xiaoliang.liu
 * @date: 2018/4/21 18:38  v1.0
 */
public class DataCount {

    public static void main(String[] args) {
        try {
            Job job = Job.getInstance(new Configuration());

            job.setJarByClass(DataCount.class);


            // map
            job.setMapperClass(DCMapper.class);
            // 当map的输出参数和reduce的输入参数一样时可以不用设置下面两个类型
//            job.setMapOutputKeyClass(Text.class);
//            job.setMapOutputValueClass(DataBean.class);
            // 要处理的文件路径通过参数传进来
            FileInputFormat.setInputPaths(job, new Path(args[0]));

            // reduce
            job.setReducerClass(DCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DataBean.class);
            // 要输出的文件路径通过参数传进来
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // partitioner
            job.setPartitionerClass(DCPartitioner.class);
            // 设置启动reduce的数量
            job.setNumReduceTasks(Integer.parseInt(args[2]));

            // 提交
            job.waitForCompletion(true);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

    }


    /**
     * 根据手机号码统计上传和下载的流量以及总和 的 map
     */
    public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\t");

            // 获得一行中的指定字段数据
            String telNo = fields[1];
            Long up = Long.parseLong(fields[8]);
            Long down = Long.parseLong(fields[9]);

            DataBean dataBean = new DataBean(telNo, up, down);

            context.write(new Text(telNo), dataBean);
        }

    }

    /**
     * 根据手机号码统计上传和下载的流量以及总和 的 Partitioner
     */
    public static class DCPartitioner extends Partitioner<Text, DataBean> {

        private static HashMap<String, Integer> map = new HashMap<String, Integer>();

        static {
            map.put("135", 1);
            map.put("136", 1);
            map.put("137", 1);
            map.put("138", 1);
            map.put("139", 1);
            map.put("150", 2);
            map.put("159", 2);
            map.put("182", 3);
            map.put("183", 3);
        }

        @Override
        public int getPartition(Text key, DataBean value, int partitionNum) {
            // 获得手机号
            String account = key.toString();
            // 取前3位
            String sub_acc = account.substring(0, 3);

            Integer code = map.get(sub_acc);

            if (code == null) {
                code = 0;
            }

            return code;
        }
    }


    /**
     * 根据手机号码统计上传和下载的流量以及总和 的 reduce
     */
    public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean> {
        @Override
        protected void reduce(Text key, Iterable<DataBean> values, Context context) throws IOException, InterruptedException {
            Long up_sum = 0L;
            Long down_sum = 0L;

            for (DataBean bean : values) {
                up_sum += bean.getUpPayLoad();
                down_sum += bean.getDownPayLoad();
            }

            DataBean bean = new DataBean("", up_sum, down_sum);
            context.write(key, bean);
        }
    }
}
